-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Post Service #130
Post Service #130
Conversation
Codecov Report
@@ Coverage Diff @@
## main #130 +/- ##
==========================================
- Coverage 94.82% 92.31% -2.51%
==========================================
Files 20 25 +5
Lines 2645 2966 +321
==========================================
+ Hits 2508 2738 +230
- Misses 137 228 +91
|
f0757da
to
72abb74
Compare
73a6996
to
6e2b626
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(still reading code)
log::error!("Got unexpected response: {:?}", resp); | ||
} | ||
} | ||
sleep(Duration::from_secs(5)).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the purpose of sleep?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally, this is a test binary I created for manually testing the post service "on the go". It continuously commands to create the proof in a loop. which I think I will remove entirely as the functionality is covered in tests now.
The sleep is there only to span out the calls to the post-service.
GenProofRequest, GenProofResponse, GenProofStatus, Proof, ProofMetadata, ServiceResponse, | ||
}; | ||
use tokio::sync::mpsc; | ||
use tokio::time::sleep; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think implementation will be simpler with 2 threads that communicate over a single subscriber channel. one thread connects to node and dispatches commands from node, and another doing the work and writes responses.
or it is not worth it and using tokio (other event loop) is just a good default option?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not entirely sure what you mean exactly. Do you mean to have an actor-like pattern where the client and service run in a loop and communicate via a channel instead of just directly calling service methods from the client? It is possible, but IMHO will complicate the code as the calls would become "async" (send on a channel and await response on another channel).
}; | ||
let res = self.register_and_serve(client).await; | ||
log::info!("disconnected: {res:?}"); | ||
sleep(self.reconnect_interval).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to stop service while it is blocked on sleep here? by terminating tokio or future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the application will quit on SIGINT.
}; | ||
|
||
let response = client.register(Request::new(outbound)).await?; | ||
let mut inbound = response.into_inner(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you help me to understand why you can't write to inbound
directly and need this intermediate channel? inbound
is not buffered?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a 2xstreaming GRPC call (both client and server create its stream). The inbound is a stream of incoming messages, that the client reads from. But in order to call /Register, I first need to create the outbound stream which sends messages to the server. Now, as the flow is:
- obtain message from inbound
- process
- send response on outbound
I need a channel to pass information on point 3 from the while let Some(request) = inbound.message() {...}
loop up to the outbound.
Ok(ProofGenState::Finished { proof, metadata }) => { | ||
log::info!("proof generation finished"); | ||
if let Err(err) = self.service.verify_proof(&proof, &metadata) { | ||
log::error!("generated proof is not valid: {err:?}"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it support formatted logging? otherwise it will be hard to use with loki/grafana
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, but https://github.com/tokio-rs/tracing does, I will create an issue to transition to tracing/structured logs. 👍
Closes #129
Also, see spacemeshos/pm#260 for the general design.
Implementation of Post Service with support for: